package com.xiaoxin.executor.batch.job.BlogInfo;


import com.xiaoxin.executor.batch.listener.MyJobListener;
import com.xiaoxin.executor.batch.listener.MyReadListener;
import com.xiaoxin.executor.batch.listener.MyWriteListener;
import com.xiaoxin.executor.batch.processor.MyItemProcessor;
import com.xiaoxin.executor.batch.validator.MyBeanValidator;
import com.xiaoxin.executor.domain.BlogInfo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;

@Component()
public class BlogInfoTask {

    @Autowired
    private MyJobListener myJobListener;

    /**
     * 定义job
     *
     * @param jobs
     * @param myStep
     * @return
     */
    @Bean
    public Job myJob(JobBuilderFactory jobs, Step myStep) {
        return jobs.get("myJob")
                .incrementer(new RunIdIncrementer())
                .flow(myStep)
                .end()
                .listener(myJobListener)
                .build();
    }

    /**
     * ItemReader定义：读取文件数据+entirty实体类映射
     *
     * @return
     */
    @Bean
    public ItemReader<BlogInfo> reader() {
        // 使用FlatFileItemReader去读cvs文件，一行即一条数据
        FlatFileItemReader<BlogInfo> reader = new FlatFileItemReader<>();
        // 设置文件处在路径
        reader.setResource(new ClassPathResource("file/bloginfo.csv"));
        // entity与csv数据做映射
        reader.setLineMapper(new DefaultLineMapper<BlogInfo>() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(new String[]{"blogAuthor", "blogUrl", "blogTitle", "blogItem"});
                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<BlogInfo>() {
                    {
                        setTargetType(BlogInfo.class);
                    }
                });
            }
        });
        return reader;
    }


    /**
     * 注册ItemProcessor: 处理数据+校验数据
     *
     * @return
     */
    @Bean
    public ItemProcessor<BlogInfo, BlogInfo> processor() {
        MyItemProcessor myItemProcessor = new MyItemProcessor();
        // 设置校验器
        myItemProcessor.setValidator(new MyBeanValidator<BlogInfo>());
        return myItemProcessor;
    }

    /**
     * ItemWriter定义：指定datasource，设置批量插入sql语句，写入数据库
     *
     * @param masterDataSource
     * @return
     */
    @Bean
    public ItemWriter<BlogInfo> writer(@Qualifier(value = "masterDataSource") DataSource masterDataSource) {
        // 使用jdbcBcatchItemWrite写数据到数据库中
        JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
        // 设置有参数的sql语句
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
        String sql = "insert into bloginfo " + " (blogAuthor,blogUrl,blogTitle,blogItem) "
                + " values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
        writer.setSql(sql);
        writer.setDataSource(masterDataSource);
        return writer;
    }

    /**
     * step定义：
     * 包括
     * ItemReader 读取
     * ItemProcessor  处理
     * ItemWriter 输出
     *
     * @param stepBuilderFactory
     * @param reader
     * @param writer
     * @param processor          首先看到我们代码出现的第一个设置，chunk( 6500 ) ，Chunk的机制(即每次读取一条数据，再处理一条数据，累积到一定数量后再一次性交给writer进行写入操作。
     *                           没错，对于整个step环节，就是数据的读取，处理最后到输出。
     *                           这个chunk机制里，我们传入的 6500，也就是是告诉它，读取处理数据，累计达到 6500条进行一次批次处理，去执行写入操作。
     *                           这个传值，是根据具体业务而定，可以是500条一次，1000条一次，也可以是20条一次，50条一次。
     *                           没错，这个就是设置重试，当出现异常的时候，重试多少次。我们设置为3，也就是说当一条数据操作失败，那我们会对这条数据进行重试3次，还是失败就是 当做失败了， 那么我们如果有配置skip（推荐配置使用），那么这个数据失败记录就会留到给 skip 来处理。
     *                           skip(Exception.class).skipLimit(2)
     *                           skip，跳过，也就是说我们如果设置3， 那么就是可以容忍 3条数据的失败。只有达到失败数据达到3次，我们才中断这个step。
     *                           对于失败的数据，我们做了相关的监听器以及异常信息记录，供与后续手动补救。
     * @return
     */
    @Bean
    public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<BlogInfo> reader,
                       ItemWriter<BlogInfo> writer, ItemProcessor<BlogInfo, BlogInfo> processor) {
        return stepBuilderFactory
                .get("myStep")
                .<BlogInfo, BlogInfo>chunk(65000) // Chunk的机制(即每次读取一条数据，再处理一条数据，累积到一定数量后再一次性交给writer进行写入操作)
                .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
                .listener(new MyReadListener())
                .processor(processor)
                .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
                .listener(new MyWriteListener())
                .build();
    }

}
